1、Kafka的重要组件 2、Kafka的负载均衡 3、Kafka的存储机制
1、SparkStreaming概述 2、DStream 3、案例实战 4、Spark On Yarn
1、了解SparkStreaming的应用场景 2、熟悉DStream 3、通过案例掌握SparkStreaming的开发流程 4、Spark On Yarn使用场景及任务提交方式
Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合。
易用
容错
易整合到Spark体系
Spark | Storm |
---|---|
开发语言:Scala | 开发语言:Clojure |
编程模型:DStream | 编程模型:Spout/Bolt |
Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:
DStream上的原语与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。
Transformation | Meaning |
---|---|
map(func) | Return a new DStream by passing each element of the source DStream through a function func. |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items. |
filter(func) | Return a new DStream by selecting only the records of the source DStream on which func returns true. |
repartition(numPartitions) | Changes the level of parallelism in this DStream by creating more or fewer partitions. |
union(otherStream) | Return a new DStream that contains the union of the elements in the source DStream and otherDStream. |
count() | Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. |
reduce(func) | Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. |
countByValue() | When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. |
reduceByKey(func, [numTasks]) | When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
join(otherStream, [numTasks]) | When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. |
cogroup(otherStream, [numTasks]) | When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. |
transform(func) | Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. |
updateStateByKey(func) | Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. |
特殊的Transformations
1、UpdateStateByKeyOperation
UpdateStateByKey原语用于记录历史记录,上文中Word Count示例中就用到了该特性。若不用UpdateStateByKey来更新状态,那么每次数据进来后分析完成后,结果输出后将不再保存
2、TransformOperation
Transform原语允许DStream上执行任意的RDD-to-RDD函数。通过该函数可以方便的扩展Spark API。此外,MLlib(机器学习)以及Graphx也是通过本函数来进行结合的。
3、WindowOperations
Window Operations有点类似于Storm中的State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。
Output Operations可以将DStream的数据输出到外部的数据库或文件系统,当某个Output Operations原语被调用时(与RDD的Action相同),streaming程序才会开始真正的计算过程。
Output Operation | Meaning |
---|---|
print() | Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging. |
saveAsTextFiles(prefix, [suffix]) | Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
saveAsObjectFiles(prefix, [suffix]) | Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
saveAsHadoopFiles(prefix, [suffix]) | Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
foreachRDD(func) | The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs. |
xobject SparkStreamingWC {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkStreamingWC").setMaster("local[2]")
val sc = new SparkContext(conf)
// 创建SparkStreaming的上下文对象
val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))
// 从NetCat服务里获取数据
val dStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 8888)
// 调用DStream里的api进行计算
val res: DStream[(String, Int)] = dStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
res.print()
// 提交任务到集群
ssc.start()
// 线程等待,等待处理任务
ssc.awaitTermination()
}
}
xxxxxxxxxx
/**
* 实现按批次累加功能,需要调用updateStateByKey
* 其中需要自定义一个函数,该函数是对历史结果数据和当前批次数据的操作过程
* 该函数中第一个参数代表每个单词
* 第二个参数代表当前批次单词出现的次数:Seq(1,1,1,1)
* 第三个参数代表之前批次累加的结果,可能有值,也可能没有值,所以在获取的时候要用getOrElse方法
*/
object SparkStreamingACCWC {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkStreamingACCWC").setMaster("local[2]")
val ssc = new StreamingContext(conf, Milliseconds(5000))
// 设置检查点目录
// ssc.checkpoint("hdfs://node01:9000/cp-20180306-1")
ssc.checkpoint("c://cp-20180306-1")
// 获取数据
val dStream = ssc.socketTextStream("node01", 8888)
val tup: DStream[(String, Int)] = dStream.flatMap(_.split(" ")).map((_, 1))
val res: DStream[(String, Int)] =
tup.updateStateByKey(func, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
res.print()
ssc.start()
ssc.awaitTermination()
}
val func = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
it.map(x => {
(x._1, x._2.sum + x._3.getOrElse(0))
})
}
}
xxxxxxxxxx
object LoadKafkaDataAndWC {
def main(args: Array[String]): Unit = {
LoggerLevels.setStreamingLogLevels()
val conf = new SparkConf().setAppName("LoadKafkaDataAndWC").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
// 设置请求kafka的几个参数
val Array(zkQuorum, group, topics, numTheads) = args
// 设置检查点
ssc.checkpoint("c://cp-20180306-2")
// 获取每一个topic并放到一个Map里
val topicMap: Map[String, Int] = topics.split(",").map((_, numTheads.toInt)).toMap
// 调用KafkaUtils工具类获取kafka的数据
val data: ReceiverInputDStream[(String, String)] =
KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
// 因为DStream里的key是offset值,把DStream里的value数据取出来
val lines: DStream[String] = data.map(_._2)
val tup = lines.flatMap(_.split(" ")).map((_, 1))
val res: DStream[(String, Int)] = tup.updateStateByKey(func, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
res.print()
ssc.start()
ssc.awaitTermination()
}
val func = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
it.map{
case (x, y, z) => {
(x, y.sum + z.getOrElse(0))
}
}
}
}
xxxxxxxxxx
object WindowOperationWC {
def main(args: Array[String]): Unit = {
LoggerLevels.setStreamingLogLevels()
val conf = new SparkConf().setAppName("WindowOperationWC").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.checkpoint("c://cp-20180306-3")
val dStream = ssc.socketTextStream("node01", 8888)
val tup = dStream.flatMap(_.split(" ")).map((_, 1))
// 调用窗口操作来计算数据的聚合。批次间隔是5秒,设置窗口长度是10,滑动间隔是10秒
val res: DStream[(String, Int)] =
tup.reduceByKeyAndWindow((x: Int, y: Int) => (x + y), Seconds(10), Seconds(10))
res.print()
ssc.start()
ssc.awaitTermination()
}
}
简单来说就是将Spark任务运行在Yarn资源调度框架上,而不是运行在Standalone模式下
http://spark.apache.org/docs/latest/running-on-yarn.html
1、安装Hadoop:需要安装HDFS模块和YARN模块,HDFS必须安装,Spark运行时要把jar包存放到HDFS上。
2、安装Spark:解压Spark安装程序到一台服务器上,修改spark-env.sh配置文件,Spark程序将作为Yarn的客户端用于提交任务。
xxxxxxxxxx
export JAVA_HOME=/usr/local/jdk1.7.0_80
export HADOOP_CONF_DIR=/usr/local/hadoop-2.6.4/etc/hadoop
3、启动HDFS和Yarn
xxxxxxxxxx
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--queue default \
lib/spark-examples*.jar\
100
xxxxxxxxxx
./bin/spark-submit \
--class com.qf.spark.day1.WordCount\
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--queue default \
/home/bigdata/sparkwordcount.jar\
hdfs://node01:9000/wc
hdfs://node01:9000/out-yarn-1
xxxxxxxxxx
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 2 \
--queue default \
lib/spark-examples*.jar\
10
spark-shell必须使用client模式
xxxxxxxxxx
./bin/spark-shell \
--master yarn \
--deploy-mode client
cluster模式:Driver程序在YARN中运行,应用的运行结果不能在客户端显示,所以最好运行那些将结果最终保存在外部存储介质(如HDFS、Redis、Mysql)而非stdout输出的应用程序,客户端的终端显示的仅是作为YARN的job的简单运行状况。 client模式:Driver运行在Client上,应用程序运行结果会在客户端显示,所有适合运行结果有输出的应用程序(如spark-shell)
cluster模式:
Spark Driver首先作为一个ApplicationMaster在YARN集群中启动,客户端提交给ResourceManager的每一个job都会在集群的NodeManager节点上分配一个唯一的ApplicationMaster,由该ApplicationMaster管理全生命周期的应用。具体过程:
1、由client向ResourceManager提交请求,并上传jar到HDFS上 这期间包括四个步骤: a).连接到RM b).从RM的ASM(ApplicationsManager)中获得metric、queue和resource等信息。 c). upload app jar and spark-assembly jar d).设置运行环境和container上下文(launch-container.sh等脚本) 2、ResouceManager向NodeManager申请资源,创建SparkApplicationMaster(每个SparkContext都有一个ApplicationMaster) 3、NodeManager启动ApplicationMaster,并向ResourceManagerAsM注册 4、ApplicationMaster从HDFS中找到jar文件,启动SparkContext、DAGscheduler和YARN ClusterScheduler 5、ResourceManager向ResourceManagerAsM注册申请container资源 6、ResourceManager通知NodeManager分配Container,这时可以收到来自ASM关于container的报告。(每个container对应一个executor) 7、Spark ApplicationMaster直接和container(executor)进行交互,完成这个分布式任务。
client模式:
在client模式下,Driver运行在Client上,通过ApplicationMaster向RM获取资源。本地Driver负责与所有的executor container进行交互,并将最后的结果汇总。结束掉终端,相当于kill掉这个spark应用。一般来说,如果运行的结果仅仅返回到terminal上时需要配置这个。
客户端的Driver将应用提交给Yarn后,Yarn会先后启动ApplicationMaster和executor,另外ApplicationMaster和executor都是装载在container里运行,container默认的内存是1G,ApplicationMaster分配的内存是driver- memory,executor分配的内存是executor-memory。同时,因为Driver在客户端,所以程序的运行结果可以在客户端显示,Driver以进程名为SparkSubmit的形式存在。
1、SparkStreaming的知识点 2、Spark On Yarn提交任务的两种方式
1、用SparkStreaming实现按批次累加的功能 2、用SparkStreaming实现窗口操作
1、窗口操作可以实现什么需求 2、为什么在用SparkStreaming的过程中要checkpoint